# For torch functions
import torch
# for Neural network layers
import torch.nn as nn
# For neural network functions:
import torch.nn.functional as F
# For Open ML datasets available in pytorch
from torchvision import transforms
from torchvision.datasets import CIFAR10
from torch.utils.data import DataLoader, Subset, ConcatDataset, Dataset
from torchvision.utils import make_grid
# for Optimization function in pytorch
import torch.optim as optim
from torchsummary import summary
import numpy as np
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.graph_objects as go
import plotly.express as px
import plotly as pt
%matplotlib inline
# Speech Denoising Modules
import librosa
import IPython.display as ipd
from scipy.io import wavfile
print(torch.__version__)
print(torch.cuda.get_device_name(0))
use_cuda = torch.cuda.is_available()
print(use_cuda)
device = torch.device("cuda:0" if use_cuda else "cpu")
torch.backends.cudnn.benchmark = True
device
!nvidia-smi
# Load Audio Files
s, sr=librosa.load('train_clean_male.wav', sr=None)
S=librosa.stft(s, n_fft=1024, hop_length=512)
sn, sr=librosa.load('train_dirty_male.wav', sr=None)
X=librosa.stft(sn, n_fft=1024, hop_length=512)
S, X = np.abs(S), np.abs(X)
# Covert to Pytorch tensors:
S = torch.tensor(np.transpose(S)).to(device)
X = torch.tensor(np.transpose(X)).to(device)
# # Dataloaders
BATCH_SIZE = 128
TrainLoader = DataLoader(X, batch_size=BATCH_SIZE)
TestLoader = DataLoader(S, batch_size=BATCH_SIZE)
ipd.Audio('train_clean_male.wav')
ipd.Audio('train_dirty_male.wav')
class SpeechDenoiser(nn.Module):
def __init__(self):
super(SpeechDenoiser, self).__init__()
# Convolution Layers:
self.conv1 = nn.Sequential(
nn.Conv1d(1, 16, kernel_size=2, stride=2),
nn.ReLU(),
nn.MaxPool1d(kernel_size=2, stride=1))
self.conv2 = nn.Sequential(
nn.Conv1d(16, 32, kernel_size=2, stride=2),
nn.ReLU(),
nn.MaxPool1d(kernel_size=2, stride=1))
# Linear Layer:
self.fc1 = nn.Linear(4032, 2016)
self.fc2 = nn.Linear(2016, 513)
# Weight Initialization
for m in self.modules():
if isinstance(m, nn.Linear):
nn.init.kaiming_normal_(m.weight, a=0, mode='fan_in', nonlinearity='relu')
# Activation
self.relu = nn.ReLU()
def forward(self, x):
# Pass the input tensor x through our layers:
x = x[:, np.newaxis,:]
# x = x.view(-1, 513).to(device)
# Layer 1
x = self.conv1(x)
# Layer 2
x = self.conv2(x)
# First Linear Layer
x = x.reshape(x.shape[0], -1)
x = self.relu(self.fc1(x))
# Final Layer
x = self.fc2(x)
x = torch.nn.functional.relu(x)
return x
def train_model(model, epochs=200):
# Loss Function
criterion = nn.MSELoss()
# Loss Optimizer
optimizer = optim.Adam(model.parameters(), lr = 0.001)
# No:of times to train data
loss_of_each_epoch = []
for e in tqdm(range(epochs)):
train_running_loss = 0.0
train_acc = 0.0
for (input, output) in zip(TrainLoader, TestLoader):
# Convert the input to required format
input = input.to(device)
output = output.to(device)
# set optimizer gradients to zero:
optimizer.zero_grad()
predictions = model(input) # Output predictions
loss = criterion(predictions, output) # Loss Caluclation
loss.backward() # Pass loss function gradients to pervious layers:
optimizer.step() # Update Weights
loss_of_each_epoch.append(loss.data.cpu().numpy())
print(loss.data)
plt.plot(range(epochs), loss_of_each_epoch)
plt.show()
model = SpeechDenoiser().to(device)
summary(model, (513,))
train_model(model,epochs=500)
PATH = './Problem1.pth'
torch.save(model.state_dict(), PATH)
# Load Model
# NN = NeuralNetwork().to(device)
# NN.load_state_dict(torch.load(PATH))
def audio_test(input_file, output_file, model):
s, sr = librosa.load(input_file, sr=None)
temp = librosa.stft(s, n_fft=1024, hop_length=512)
temp_abs = torch.tensor(np.abs(temp))
temp_abs = np.transpose(temp_abs)
TempLoader = torch.utils.data.DataLoader(temp_abs, batch_size=temp_abs.shape[0])
with torch.no_grad():
for i in TempLoader:
i = i.to(device)
output = model(i)
recov = (temp/np.abs(temp)) * output.detach().cpu().numpy().T
recov_istft = librosa.istft(recov, hop_length=512)
librosa.output.write_wav(output_file, recov_istft, sr)
# Testing audio 1
audio_test('test_x_01.wav', 'test_x_01_recons_model.wav', model)
samplingFrequency, signalData = wavfile.read('test_x_01.wav')
plt.plot(signalData)
ipd.Audio('test_x_01.wav')
samplingFrequency, signalData = wavfile.read('test_x_01_recons_model.wav')
plt.plot(signalData)
ipd.Audio('test_x_01_recons_model.wav')
# Testing audio 2
audio_test('test_x_02.wav', 'test_x_02_recons_model.wav', model)
samplingFrequency, signalData = wavfile.read('test_x_02.wav')
plt.plot(signalData)
ipd.Audio('test_x_02.wav')
samplingFrequency, signalData = wavfile.read('test_x_02_recons_model.wav')
plt.plot(signalData)
ipd.Audio('test_x_02_recons_model.wav')
# Load Audio Files
s, sr=librosa.load('train_clean_male.wav', sr=None)
S=librosa.stft(s, n_fft=1024, hop_length=512)
sn, sr=librosa.load('train_dirty_male.wav', sr=None)
X=librosa.stft(sn, n_fft=1024, hop_length=512)
S = torch.tensor(np.abs(S)).float().T
X = torch.tensor(np.abs(X)).float().T
class AudioDataset(Dataset):
def __init__(self, data, label):
self.train = data
self.label = label
def __len__(self):
if len(self.train) == len(self.label):
return len(self.train)
def __getitem__(self, idx):
return self.train[idx], self.label[idx]
def data_preprocessing(input):
output = []
for i in range(20, input.shape[0]+1):
output.append(input[i-20:i,:].view(1,20,513))
return torch.cat(output)
def add_frames(input):
silent_frames = torch.distributions.Normal(0, 0.00005)
frames = silent_frames.sample((19,513))
return torch.cat((frames, input))
input = add_frames(X)
input = data_preprocessing(input).to(device)
output = S.to(device)
print(input.shape, output.shape)
ds = AudioDataset(input, output)
train_loader = DataLoader(ds, batch_size=256, num_workers=0)
x, y = next(iter(train_loader))
x = x[:, np.newaxis,:,:]
x.shape
conv = nn.Sequential(
nn.Conv2d(1, 4, kernel_size=2, stride=1),
nn.ReLU(),
nn.AvgPool2d(kernel_size=2, stride=1),
nn.Conv2d(4, 8, kernel_size=2, stride=1),
nn.ReLU(),
nn.AvgPool2d(kernel_size=2, stride=1),
nn.Conv2d(8, 16, kernel_size=2, stride=2),
nn.ReLU(),
nn.AvgPool2d(kernel_size=2, stride=2)).to(device)
o = conv(x)
o.shape
self.conv = nn.Sequential(
nn.Conv2d(1, 8, kernel_size=2, stride=1),
nn.ReLU(),
nn.AvgPool2d(kernel_size=2, stride=1),
nn.Conv2d(8, 16, kernel_size=2, stride=1),
nn.ReLU(),
nn.AvgPool2d(kernel_size=2, stride=1),
nn.Conv2d(16, 32, kernel_size=2, stride=2),
nn.ReLU(),
nn.AvgPool2d(kernel_size=2, stride=2))
self.fc1 = nn.Linear(in_features=16256, out_features=8000)
self.fc2 = nn.Linear(in_features=8000, out_features=1000)
self.fc3 = nn.Linear(in_features=1000, out_features=513)
class SpeechDenoiser2(nn.Module):
def __init__(self):
super(SpeechDenoiser2, self).__init__()
# Convolution Layers:
self.conv1 = nn.Sequential(
nn.Conv2d(1, 8, kernel_size=2, stride=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=1))
self.conv2 = nn.Sequential(
nn.Conv2d(8, 16, kernel_size=2, stride=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=1))
self.conv3 = nn.Sequential(
nn.Conv2d(16, 32, kernel_size=2, stride=2),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
# Linear Layer:
self.fc1 = nn.Linear(16256, 8000)
self.fc2 = nn.Linear(8000, 1000)
self.fc3 = nn.Linear(1000, 513)
# # Weight Intialization:
# for m in self.modules():
# if isinstance(m, nn.Linear):
# nn.init.kaiming_normal_(m.weight, a=0, mode='fan_in', nonlinearity='relu')
# Activation
self.relu = nn.ReLU()
self.dropout = nn.Dropout(0.3)
def forward(self, x):
# Data
x = x[:, np.newaxis,:,:]
# Layer1
x = self.conv1(x)
# Layer2
x = self.conv2(x)
# Layer3
x = self.conv3(x)
# First Linear Layer
x = x.reshape(x.shape[0], -1)
# x = self.fc1(x)
x = self.relu(self.fc1(x))
x = self.dropout(x)
# second LInear Layer
x = self.relu(self.fc2(x))
x = self.dropout(x)
# Final Layer
x = self.fc3(x)
x = torch.nn.functional.relu(x)
return x
def train_model(model, epochs=200):
# Loss Function
criterion = nn.MSELoss()
# Loss Optimizer
optimizer = optim.Adam(model.parameters(), lr = 0.001)
# optimizer = optim.SGD(model.parameters(), lr=0.1)
# No:of times to train data
loss_of_each_epoch = []
for e in tqdm(range(epochs)):
train_running_loss = 0.0
train_acc = 0.0
for (input, output) in train_loader:
# set optimizer gradients to zero:
optimizer.zero_grad()
predictions = model(input) # Output predictions
loss = criterion(predictions, output) # Loss Caluclation
loss.backward() # Pass loss function gradients to pervious layers:
optimizer.step() # Update Weights
loss_of_each_epoch.append(loss.data.cpu().numpy())
print(loss.data)
plt.plot(range(epochs), loss_of_each_epoch)
plt.show()
model = SpeechDenoiser2().to(device)
summary(model, (20,513))
train_model(model,epochs=150)
PATH = './Problem2.pth'
torch.save(model.state_dict(), PATH)
# # Load Model
# model = SpeechDenoiser2().to(device)
# model.load_state_dict(torch.load(PATH))
def audio_test(input_file, output_file, model):
# Load Data
s, sr = librosa.load(input_file, sr=None)
temp = librosa.stft(s, n_fft=1024, hop_length=512)
# Create a DataLoader
temp_ = torch.tensor(np.abs(temp)).float().T
temp_ = add_frames(temp_)
temp_ = data_preprocessing(temp_)
TempLoader = DataLoader(temp_, batch_size=len(temp_))
# Predictions
with torch.no_grad():
# each_array = [0] * 513
# output_list = np.array([each_array] * 19)
# output_list = []
for i in TempLoader:
i = i.to(device)
output = model(i)
# output_list = output
output_list = output.cpu().numpy()
# output_list = np.concatenate((output_list, output), 0)
# Recover Audio from predictions
recov = (temp/np.abs(temp)) * output_list.T
recov_istft = librosa.istft(recov, hop_length=512)
librosa.output.write_wav(output_file, recov_istft, sr)
return output_list
# Testing audio 1
out = audio_test('test_x_01.wav', 'test_x_01_recons_model2.wav', model)
samplingFrequency, signalData = wavfile.read('test_x_01.wav')
plt.plot(signalData)
ipd.Audio('test_x_01.wav')
samplingFrequency, signalData = wavfile.read('test_x_01_recons_model2.wav')
plt.plot(signalData)
ipd.Audio('test_x_01_recons_model2.wav')
# Testing audio 2
out = audio_test('test_x_02.wav', 'test_x_02_recons_model2.wav', model)
samplingFrequency, signalData = wavfile.read('test_x_02.wav')
plt.plot(signalData)
ipd.Audio('test_x_02.wav')
samplingFrequency, signalData = wavfile.read('test_x_02_recons_model2.wav')
plt.plot(signalData)
ipd.Audio('test_x_02_recons_model2.wav')
pred = audio_test('train_dirty_male.wav', 'train_dirty_male_recons.wav', model)
sam = S.cpu().numpy()
print(f'SNR of Train Audio => {10*np.log10(np.sum(np.square(sam))/(np.sum(np.square(np.subtract(sam,pred)))))}')
# Transformation
transform = transforms.Compose(
[transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
# Load train data
trainset = CIFAR10(root='./data', train=True, download=True, transform=transform)
# Divide Data into train and validation sets.
index = list(range(len(trainset)))
v_ind, t_ind = index[:5000], index[5000:]
trainloder = DataLoader(Subset(trainset, t_ind), batch_size=200, num_workers=2)
validloader = DataLoader(Subset(trainset, v_ind), batch_size=5000, num_workers=2)
class BaseCNN(nn.Module):
def __init__(self):
super().__init__()
# Convolution Layers:
self.conv1 = nn.Sequential(
nn.Conv2d(3, 10, kernel_size=5, stride=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.conv2 = nn.Sequential(
nn.Conv2d(10,10, kernel_size=5, stride=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
# Linear Layer:
self.fc1 = nn.Linear(10*5*5, 20)
self.fc2 = nn.Linear(20, 10)
for m in self.modules():
if isinstance(m, nn.Linear):
nn.init.kaiming_normal_(m.weight, a=0, mode='fan_in', nonlinearity='relu')
def forward(self, x):
# print(x.shape)
# Layer1
x = self.conv1(x)
# Layer2
x = self.conv2(x)
# Layer 3
x = x.reshape(x.shape[0], -1)
x = self.fc1(x)
x = nn.functional.relu(x)
# Final Layer
x = self.fc2(x)
return x
model = BaseCNN().to(device)
def get_accuracy(output, target, batch_size):
''' Obtain accuracy for training round '''
corrects = (torch.max(output, 1)[1].view(target.size()).data == target.data).sum()
accuracy = 100.0 * corrects/batch_size
return accuracy.item()
def train_model(epochs=200, model=None, trainloder=None, validloader=None):
# Loss Function
criterion = nn.CrossEntropyLoss()
# Loss Optimizer
optimizer = optim.Adam(model.parameters(), lr = 0.001)
# No:of times to train data
train_acc = []
valid_acc = []
for e in tqdm(range(epochs)):
train_acc_temp = 0.0
for i, (input, output) in enumerate(trainloder):
input, output = input.to(device), output.to(device)
optimizer.zero_grad()
predictions = model(input) # Output predictions
loss = criterion(predictions, output) # Loss Caluclation
loss.backward() # Pass loss function gradients to pervious layers:
optimizer.step() # Update Weights
train_acc_temp += get_accuracy(predictions, output, 200)
train_acc.append(train_acc_temp/i)
# Validation Step.
with torch.no_grad():
for images, labels in validloader:
images, labels = images.to(device), labels.to(device)
output = model(images)
valid_acc.append(get_accuracy(output, labels, 5000))
return epochs, valid_acc, train_acc
epoch, valid_acc, train_acc = train_model(model=model, epochs=100, trainloder=trainloder, validloader=validloader)
sns.set_theme()
epoch = list(range(100))
plt.figure(figsize=(15, 10))
va = plt.plot(epoch, valid_acc, label='valid_acc')
ta = plt.plot(epoch, train_acc, label='train_acc')
plt.legend(['base_valid_acc', 'base_train_acc'])
plt.title("Accuracy of Non Augmented Data")
plt.xlabel('Number of Epochs')
plt.ylabel("Accuracy (%)")
plt.show()
epoch = list(range(100))
# Create traces
fig = go.Figure()
fig.add_trace(go.Scatter(x=epoch, y=valid_acc,
mode='lines',
name='valid_acc'))
fig.add_trace(go.Scatter(x=epoch, y=train_acc,
mode='lines',
name='train_acc'))
# fig.title
fig.update_layout(
title="Accuracy of Non Augmented Data",
xaxis_title="Number of Epochs",
yaxis_title="Accuracy (%)",
font=dict(
family="Courier New, monospace",
size=18,
color="RebeccaPurple"
),
width=900,
height=600
)
fig.show()
# Accuracy on Test Data
with torch.no_grad():
for images, labels in validloader:
images, labels = images.to(device), labels.to(device)
output = model(images)
test_acc = get_accuracy(output, labels, 5000)
print(f'Accuracy of the network on the 5000 validation images: {test_acc}%')
# Brightness Transformation
brighter = transforms.Compose(
[transforms.ToTensor(),
transforms.ColorJitter(brightness=(1.1,1.1)),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
# Darker Transformation
darker = transforms.Compose(
[transforms.ToTensor(),
transforms.ColorJitter(brightness=(0.9,0.9)),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
# Horizontal Flip
flip = transforms.Compose(
[transforms.ToTensor(),
transforms.RandomHorizontalFlip(p=1),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
data_aug1 = CIFAR10(root='./aug1', train=True, download=True, transform=brighter)
data_aug2 = CIFAR10(root='./aug2', train=True, download=True, transform=darker)
data_aug3 = CIFAR10(root='./aug3', train=True, download=True, transform=flip)
def imshow(img):
img = img / 2 + 0.5 # unnormalize
npimg = img.numpy()
plt.figure(figsize=(15,150))
plt.imshow(np.transpose(npimg, (1, 2, 0)))
plt.show()
# get some random training images
n = 11
images = [data_aug1[n][0], data_aug2[n][0], data_aug3[n][0], trainset[n][0]]
# show images
imshow(make_grid(images))
data = ConcatDataset([Subset(data_aug1, t_ind), Subset(data_aug2, t_ind), Subset(data_aug3, t_ind), Subset(trainset, t_ind)])
#input_batch shape: (64, in_channels, 224, 224)
images = []
labels = []
for ch in data:
images.append(ch[0])
labels.append(ch[1])
# result = torch.cat(outputs, dim=1) #shape (64, 32*in_channels, 224, 224)
images = torch.cat(images, dim=0)
images = images.reshape(180000, 3, 32, 32)
labels = torch.LongTensor(labels)
class AugmentedCIFAR10(Dataset):
"""Augmented CIFAR10 dataset."""
def __init__(self, images, labels):
# Load Data
self.images = images
self.labels = labels
self.images, self.labels = self.images.to(device), self.labels.to(device)
def __len__(self):
return len(self.labels)
def __getitem__(self, idx):
img, label = self.images[idx], self.labels[idx]
return img, label
cifar_data = AugmentedCIFAR10(images=images, labels=labels)
aug_trainloader = DataLoader(cifar_data, batch_size=200, num_workers=0)
aug_validloader = DataLoader(Subset(trainset, v_ind), batch_size=5000, num_workers=0)
aug_model = BaseCNN().to(device) # Augmented Model
def get_accuracy(output, target, batch_size):
''' Obtain accuracy for training round '''
corrects = (torch.max(output, 1)[1].view(target.size()).data == target.data).sum()
accuracy = 100.0 * corrects/batch_size
return accuracy.item()
def train_model(epochs=200, model=None, trainloder=None, validloader=None):
# Loss Function
criterion = nn.CrossEntropyLoss()
# Loss Optimizer
optimizer = optim.Adam(model.parameters(), lr = 0.001)
# No:of times to train data
train_acc = []
valid_acc = []
for e in tqdm(range(epochs)):
train_acc_temp = 0.0
for i, (input, output) in enumerate(trainloder):
# input, output = input.to(device), output.to(device)
optimizer.zero_grad()
predictions = model(input) # Output predictions
loss = criterion(predictions, output) # Loss Caluclation
loss.backward() # Pass loss function gradients to pervious layers:
optimizer.step() # Update Weights
train_acc_temp += get_accuracy(predictions, output, 200)
train_acc.append(train_acc_temp/i)
# Validation Step.
with torch.no_grad():
for images, labels in validloader:
images, labels = images.to(device), labels.to(device)
output = model(images)
valid_acc.append(get_accuracy(output, labels, 5000))
return epochs, valid_acc, train_acc
epochs, aug_valid_acc, aug_train_acc = train_model(model=aug_model, epochs=100, trainloder=aug_trainloader, validloader=aug_validloader)
sns.set_theme()
epoch = list(range(100))
plt.figure(figsize=(15, 10))
va = plt.plot(epoch, valid_acc, label='valid_acc')
ta = plt.plot(epoch, train_acc, label='train_acc')
ava = plt.plot(epoch, aug_valid_acc, label='aug_valid_acc')
ata = plt.plot(epoch, aug_train_acc, label='aug_train_acc')
plt.legend(['base_valid_acc', 'base_train_acc', 'aug_valid_acc', 'aug_train_acc'])
plt.title("Accuracy of Augmented Vs Non Augmented Data")
plt.xlabel('Number of Epochs')
plt.ylabel("Accuracy (%)")
plt.show()
epoch = list(range(100))
# Create traces
fig = go.Figure()
fig.add_trace(go.Scatter(x=epoch, y=valid_acc,
mode='lines',
name='valid_acc'))
fig.add_trace(go.Scatter(x=epoch, y=train_acc,
mode='lines',
name='train_acc'))
fig.add_trace(go.Scatter(x=epoch, y=aug_valid_acc,
mode='lines', name='aug_valid_acc'))
fig.add_trace(go.Scatter(x=epoch, y=aug_train_acc,
mode='lines', name='aug_train_acc'))
# fig.title
fig.update_layout(
title="Accuracy of Augmented Vs Non Augmented Data",
xaxis_title="Number of Epochs",
yaxis_title="Accuracy (%)",
font=dict(
family="Courier New, monospace",
size=18,
color="RebeccaPurple"
),
width=900,
height=600
)
fig.show()
variables = {'valid_acc':valid_acc,
'train_acc': train_acc,
'aug_train_acc': aug_train_acc,
'aug_valid_acc': aug_valid_acc}
import pickle
# with open('problem3.pickle', 'wb') as f:
# # Pickle the 'data' dictionary using the highest protocol available.
# pickle.dump(variables, f, pickle.HIGHEST_PROTOCOL)
with open('problem3.pickle', 'rb') as f:
# The protocol version used is detected automatically, so we do not
# have to specify it.
variables = pickle.load(f)
valid_acc = variables['valid_acc']
train_acc = variables['train_acc']
aug_train_acc = variables['aug_train_acc']
aug_valid_acc = variables['aug_valid_acc']
transform = transforms.Compose(
[transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
trainset = CIFAR10(root='./data', train=True, download=True, transform=transform)
index = list(range(50000))
unlab_ind, lab_ind = index[:49500], index[49500:]
labeled_data = Subset(trainset, lab_ind)
unlabeled_data = Subset(trainset, unlab_ind)
ind = [[] for _ in range(10)]
for lab in labeled_data:
ind[lab[1]].append(lab[1])
for i in ind:
print(f'{i[0]} count is {len(i)}')
# Vertically Flipping
vertical_flip = transforms.Compose(
[transforms.ToTensor(),
transforms.RandomVerticalFlip(p=1),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
# Rotate 90 degree counter clock wise
rotate90 = transforms.Compose([transforms.RandomRotation([90,90]),
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
data_aug1 = CIFAR10(root='./aug41', train=True, download=True, transform=vertical_flip)
data_aug2 = CIFAR10(root='./aug42', train=True, download=True, transform=rotate90)
len(unlab_ind)*3
for data in unlabeled_data:
print(data[0].shape)
break
def imshow(img):
img = img / 2 + 0.5 # unnormalize
npimg = img.numpy()
plt.figure(figsize=(15,150))
plt.imshow(np.transpose(npimg, (1, 2, 0)))
plt.show()
# get some random training images
n = 49499-100
images = [data_aug1[n][0], data_aug2[n][0], unlabeled_data[n][0]]
# show images
imshow(make_grid(images))
# data = ConcatDataset([Subset(trainset, unlab_ind), Subset(data_aug1, unlab_ind), Subset(data_aug2, unlab_ind)])
data_aug1 = Subset(data_aug1, unlab_ind)
data_aug2 = Subset(data_aug2, unlab_ind)
images = []
labels = []
# Unlabeled Data
for data in unlabeled_data:
images.append(data[0])
labels.append(0)
print("Unlabelled Data Done")
# Data Aug 1
for data in data_aug1:
images.append(data[0])
labels.append(1)
print("Vertical Flipping Done")
# Data Aug 2
for data in data_aug2:
images.append(data[0])
labels.append(2)
print("90 degree rotation Done")
for i in range(len(images)):
images[i] = images[i].reshape(1,3,32,32)
images = torch.cat(images, dim=0)
images = images.reshape(148500, 3, 32, 32)
labels = torch.LongTensor(labels)
class UnlabeledCIFAR(Dataset):
"""Augmented CIFAR10 dataset."""
def __init__(self, images, labels):
# Load Data
self.images = images
self.labels = labels
self.images, self.labels = self.images.to(device), self.labels.to(device)
def __len__(self):
return len(self.labels)
def __getitem__(self, idx):
img, label = self.images[idx], self.labels[idx]
return img, label
unlabeled_data = UnlabeledCIFAR(images=images, labels=labels)
unlab_trainloader = DataLoader(unlabeled_data, batch_size=512, num_workers=0, shuffle=True)
class UnlabledCNN(nn.Module):
def __init__(self):
super().__init__()
# Convolution Layers:
self.conv1 = nn.Sequential(
nn.Conv2d(3, 10, kernel_size=5, stride=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.conv2 = nn.Sequential(
nn.Conv2d(10,10, kernel_size=5, stride=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
# Linear Layer:
self.fc1 = nn.Linear(10*5*5, 20)
self.fc2 = nn.Linear(20, 3)
for m in self.modules():
if isinstance(m, nn.Linear):
nn.init.kaiming_normal_(m.weight, a=0, mode='fan_in', nonlinearity='relu')
def forward(self, x):
# Layer1
x = self.conv1(x)
# Layer2
x = self.conv2(x)
# Layer 3
x = x.reshape(x.shape[0], -1)
x = self.fc1(x)
x = nn.functional.relu(x)
# Final Layer
x = self.fc2(x)
return x
unlabel_model = UnlabledCNN().to(device)
def get_accuracy(output, target, batch_size):
''' Obtain accuracy for training round '''
corrects = (torch.max(output, 1)[1].view(target.size()).data == target.data).sum()
accuracy = 100.0 * corrects/batch_size
return accuracy.item()
# Loss Function
criterion = nn.CrossEntropyLoss()
# Loss Optimizer
optimizer = optim.Adam(unlabel_model.parameters(), lr = 0.001)
train_acc = []
# No:of times to train data
for e in tqdm(range(100)):
train_acc_temp = 0
for i, (input, output) in enumerate(unlab_trainloader):
# input, output = input.to(device), output.to(device)
optimizer.zero_grad()
predictions = unlabel_model(input) # Output predictions
loss = criterion(predictions, output) # Loss Caluclation
loss.backward() # Pass loss function gradients to pervious layers:
optimizer.step() # Update Weights
train_acc_temp += get_accuracy(predictions, output, 256)
train_acc.append(train_acc_temp/i)
print(loss.item())
train_acc = np.array(train_acc)
train_acc = train_acc/2
PATH = './Pretext.pth'
torch.save(unlabel_model.state_dict(), PATH)
# Load Model
# unlabel_model = UnlabledCNN().to(device)
# unlabel_model.load_state_dict(torch.load(PATH))
test_acc = 0
with torch.no_grad():
for i, (images, labels) in enumerate(unlab_trainloader):
# images, labels = images.to(device), labels.to(device)
output = unlabel_model(images)
test_acc += get_accuracy(output, labels, 512)
print(f'Accuracy of the network on the 5000 validation images: {test_acc/i}%')
77.248class BaseCNN(nn.Module):
def __init__(self):
super().__init__()
# Convolution Layers:
self.conv1 = nn.Sequential(
nn.Conv2d(3, 10, kernel_size=5, stride=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
self.conv2 = nn.Sequential(
nn.Conv2d(10,10, kernel_size=5, stride=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2))
# Linear Layer:
self.fc1 = nn.Linear(10*5*5, 20)
self.fc2 = nn.Linear(20, 10)
for m in self.modules():
if isinstance(m, nn.Linear):
print()
nn.init.kaiming_normal_(m.weight, a=0, mode='fan_in', nonlinearity='relu')
def forward(self, x):
# Layer1
x = self.conv1(x)
# Layer2
x = self.conv2(x)
# Layer 3
x = x.reshape(x.shape[0], -1)
x = self.fc1(x)
x = nn.functional.relu(x)
# Final Layer
x = self.fc2(x)
return x
model = BaseCNN().to(device)
trainloader = DataLoader(labeled_data, batch_size=500)
testset = CIFAR10(root='./data', train=False, download=True, transform=transform)
testloader = DataLoader(testset, batch_size=10000)
def get_accuracy(output, target, batch_size):
''' Obtain accuracy for training round '''
corrects = (torch.max(output, 1)[1].view(target.size()).data == target.data).sum()
accuracy = 100.0 * corrects/batch_size
return accuracy.item()
def train_model(epochs=200, model=None, trainloder=None, validloader=None):
# Loss Function
criterion = nn.CrossEntropyLoss()
# Loss Optimizer
optimizer = optim.Adam(model.parameters(), lr = 0.001)
# No:of times to train data
# train_acc = []
valid_acc = []
for e in tqdm(range(epochs)):
# train_acc_temp = 0.0
for i, (input, output) in enumerate(trainloder):
# input, output = input.to(device), output.to(device)
optimizer.zero_grad()
predictions = model(input) # Output predictions
loss = criterion(predictions, output) # Loss Caluclation
loss.backward() # Pass loss function gradients to pervious layers:
optimizer.step() # Update Weights
# train_acc_temp += get_accuracy(predictions, output, 200)
# train_acc.append(train_acc_temp/i)
# Validation Step.
if e%100 == 0:
with torch.no_grad():
for images, labels in validloader:
images, labels = images.to(device), labels.to(device)
output = model(images)
valid_acc.append(get_accuracy(output, labels, 10000))
return valid_acc
base_valid_acc = train_model(model=model, epochs=10000, trainloder=trainloader, validloader=testloader)
sns.set_theme()
epoch = list(range(0, 10000, 100))
plt.figure(figsize=(15, 10))
plt.plot(epoch, base_valid_acc, label='base_valid_acc')
plt.legend(['base_valid_acc'])
plt.title("Test Accuracy of Baseline Model")
plt.xlabel('Number of Epochs')
plt.ylabel("Accuracy (%)")
plt.show()
epoch = list(range(0, 10000, 100))
# Create traces
fig = go.Figure()
fig.add_trace(go.Scatter(x=epoch, y=base_valid_acc,
mode='lines',
name='base_valid_acc'))
# fig.title
fig.update_layout(
title="Test Accuracy of BaseLine model",
xaxis_title="Number of Epochs",
yaxis_title="Accuracy (%)",
font=dict(
family="Courier New, monospace",
size=18,
color="RebeccaPurple"
),
width=1000,
height=700
)
fig.show()
import pickle
# variables = {'base_valid_acc': base_valid_acc}
# with open('problem4base_acc.pickle', 'wb') as f:
# # Pickle the 'data' dictionary using the highest protocol available.
# pickle.dump(variables, f, pickle.HIGHEST_PROTOCOL)
with open('problem4base_acc.pickle', 'rb') as f:
# The protocol version used is detected automatically, so we do not
# have to specify it.
variables = pickle.load(f)
base_valid_acc = variables['base_valid_acc']
PATH = './Pretext.pth'
pretrain_model = UnlabledCNN().to(device)
pretrain_model.load_state_dict(torch.load(PATH))
pretrain_model.fc2 = nn.Linear(in_features=20, out_features=10, bias=True).to(device)
nn.init.kaiming_uniform_(pretrain_model.fc2.weight, mode='fan_in', nonlinearity='relu')
def get_accuracy(output, target, batch_size):
''' Obtain accuracy for training round '''
corrects = (torch.max(output, 1)[1].view(target.size()).data == target.data).sum()
accuracy = 100.0 * corrects/batch_size
return accuracy.item()
def train_model(epochs=200, model=None, trainloder=None, validloader=None):
# Loss Function
criterion = nn.CrossEntropyLoss()
# Loss Optimizer
optimizer = optim.Adam([
{"params": model.conv1.parameters(), "lr": 1e-5},
{"params": model.conv2.parameters(), "lr": 1e-5},
{"params": model.fc1.parameters(), "lr": 1e-6},
{"params": model.fc2.parameters(), "lr": 1e-3},
])
# No:of times to train data
# train_acc = []
valid_acc = []
for e in tqdm(range(epochs)):
# train_acc_temp = 0.0
for i, (input, output) in enumerate(trainloder):
input, output = input.to(device), output.to(device)
optimizer.zero_grad()
predictions = model(input) # Output predictions
loss = criterion(predictions, output) # Loss Caluclation
loss.backward() # Pass loss function gradients to pervious layers:
optimizer.step() # Update Weights
# train_acc_temp += get_accuracy(predictions, output, 200)
# train_acc.append(train_acc_temp/i)
# Validation Step.
if e%100 == 0:
with torch.no_grad():
for images, labels in validloader:
images, labels = images.to(device), labels.to(device)
output = model(images)
valid_acc.append(get_accuracy(output, labels, 10000))
return valid_acc
pretrain_valid_acc = train_model(model=pretrain_model, epochs=10000, trainloder=trainloader, validloader=testloader)
sns.set_theme()
epoch = list(range(0, 10000, 100))
plt.figure(figsize=(15, 10))
plt.plot(epoch, base_valid_acc, label='base_valid_acc')
plt.plot(epoch, pretrain_valid_acc, label='pretrain_valid_acc')
plt.legend(['base_valid_acc', 'pretrain_valid_acc'])
plt.title("Test Accuracy of BaseLine Vs Pretrained Model")
plt.xlabel('Number of Epochs')
plt.ylabel("Accuracy (%)")
plt.show()
# epoch = list(range(0, 10000, 100))
epoch = list(range(0, 10000, 100))
# Create traces
fig = go.Figure()
fig.add_trace(go.Scatter(x=epoch, y=pretrain_valid_acc,
mode='lines',
name='pretrain_valid_acc'))
fig.add_trace(go.Scatter(x=epoch, y=base_valid_acc,
mode='lines',
name='base_valid_acc'))
# fig.title
fig.update_layout(
title="Test Accuracy of BaseLine Vs Pretrained Model",
xaxis_title="Number of Epochs",
yaxis_title="Accuracy (%)",
font=dict(
family="Courier New, monospace",
size=18,
color="RebeccaPurple"
),
width=1500,
height=800
)
fig.show()
import pickle
variables = {'base_valid_acc': base_valid_acc,
'pretrain_valid_acc': pretrain_valid_acc}
with open('problem4.pickle', 'wb') as f:
# Pickle the 'data' dictionary using the highest protocol available.
pickle.dump(variables, f, pickle.HIGHEST_PROTOCOL)
# with open('problem4.pickle', 'rb') as f:
# # The protocol version used is detected automatically, so we do not
# # have to specify it.
# variables = pickle.load(f)
# print(variables)
# # base_valid_acc = variables['base_valid_acc']
# # pretrained_valid_acc = variables['pretrained_valid_acc']